[Notebook] Dask and Xarray on AWS-HPC cluster: distributed processing of Earth data
This notebook continues the previous post by showing the actual code for distributed data processing.
In [1]:
%matplotlib inline
import xarray as xr
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import dask
from dask.diagnostics import ProgressBar
from dask_jobqueue import SLURMCluster
import distributed
from distributed import Client, progress
In [2]:
dask.__version__, distributed.__version__
Out[2]:
In [3]:
%env HDF5_USE_FILE_LOCKING=FALSE
Data exploration¶
Data are organized by year/month:
In [4]:
ls /fsx
In [5]:
ls /fsx/2008/
In [6]:
ls /fsx/2008/01/data # one variable per file
In [7]:
# hourly data over a month
dr = xr.open_dataarray('/fsx/2008/01/data/sea_surface_temperature.nc')
dr
Out[7]:
In [8]:
# Static plot of the first time slice
fig, ax = plt.subplots(1, 1, figsize=[12, 8], subplot_kw={'projection': ccrs.PlateCarree()})
dr[0].plot(ax=ax, transform=ccrs.PlateCarree(), cbar_kwargs={'shrink': 0.6})
ax.coastlines();
What happens to the values over the land? Easier to check by an interactive plot.
In [9]:
import geoviews as gv
import hvplot.xarray
fig_hv = dr[0].hvplot.quadmesh(
x='lon', y='lat', rasterize=True, cmap='viridis', geo=True,
crs=ccrs.PlateCarree(), projection=ccrs.PlateCarree(), project=True,
width=800, height=400,
) * gv.feature.coastline
# fig_hv
In [10]:
# This is just a hack to display figure on Nikola blog post
# If you know an easier way let me know
import holoviews as hv
from bokeh.resources import CDN, INLINE
from bokeh.embed import file_html
from IPython.display import HTML
HTML(file_html(hv.render(fig_hv), CDN))
Out[10]:
So it turns out that the "temperature" over the land is set as 273.16K (0 degree celsius). A better way is probably masking them out.
Serial read with master node¶
Let 1-year data.
In [11]:
# Just querying metadata will cause files being pulled from S3 to FSx.
# This takes a while at first executation. Much faster at second time.
%time ds_1yr = xr.open_mfdataset('/fsx/2008/*/data/sea_surface_temperature.nc', chunks={'time0': 50})
dr_1yr = ds_1yr['sea_surface_temperature']
dr_1yr
Out[11]:
The aggregated size is ~29 GB:
In [12]:
dr_1yr.nbytes / 1e9 # GB
Out[12]:
In [13]:
with ProgressBar():
mean_1yr_ser = dr_1yr.mean().compute()
In [14]:
mean_1yr_ser
Out[14]:
Parallel read with dask cluster¶
Cluster initialization¶
In [15]:
!sinfo # spin-up 8 idle nodes
In [16]:
!mkdir -p ./dask_tempdir
In [17]:
# Reference: https://jobqueue.dask.org/en/latest/configuration.html
# - "cores" is the number of CPUs used per Slurm job.
# Here fix it as 72, which is the number of vCPUs per c5n.18xl node. So one slurm job gets exactly one node.
# - "processes" specifies the number of dask workers in a single Slurm job.
# - "memory" specifies the memory requested in a single Slurm job.
cluster = SLURMCluster(cores=72, processes=36, memory='150GB',
local_directory='./dask_tempdir')
In [18]:
# 8 node * 36 workers/node
cluster.scale(8*36)
cluster
Visit http://localhost:8787 for the dashboard.
In [19]:
# remember to also create dask client to talk to the cluster!
client = Client(cluster) # automatically switches to distributed mode
client
Out[19]:
In [20]:
# now the default scheduler is dask.distributed
dask.config.get('scheduler')
Out[20]:
In [21]:
!sinfo # now fully allocated
In [22]:
!squeue # all are dask worker jobs, one per compute node
Read 1-year data¶
In [23]:
# Actually, no need to reopen files. Can just reuse the previous dask graph and put it onto the cluster
ds_1yr = xr.open_mfdataset('/fsx/2008/*/data/sea_surface_temperature.nc', chunks={'time0': 25})
dr_1yr = ds_1yr['sea_surface_temperature']
dr_1yr
Out[23]:
In [24]:
%time mean_1yr_par = dr_1yr.mean().compute()
The bandwidth like is 29GB/5s ~ 6 GB/s.
In [25]:
mean_1yr_par.equals(mean_1yr_ser) # consistent with serial result
Out[25]:
In [26]:
len(dr_1yr.chunks[0]) # there are actually not that many chunks for dask, but it is still super fast
Out[26]:
Read multi-year data¶
For this part you might get "Too many files open" error. If so, run sudo sh -c "ulimit -n 65535 && exec su $LOGNAME" to raise the limit before starting Jupyter (ref: https://stackoverflow.com/a/17483998).
In [27]:
file_list = [f'/fsx/{year}/{month:02d}/data/sea_surface_temperature.nc'
for year in range(2008, 2018) for month in range(1, 13)]
len(file_list) # many files
Out[27]:
In [28]:
file_list[0:3], file_list[-3:]
Out[28]:
In [29]:
# Will cause data being pulled from S3 to FSx.
# Will take a long time at first executation. Much faster at second time.
%time ds_10yr = xr.open_mfdataset(file_list, chunks={'time0': 50})
dr_10yr = ds_10yr['sea_surface_temperature']
dr_10yr
Out[29]:
Near 300 GB!
In [31]:
dr_10yr.nbytes / 1e9 # GB
Out[31]:
In [34]:
%time mean_10yr = dr_10yr.mean().compute()
260 GB/ 24s = 10 GB/s ?!?!?
In [35]:
%time ts_10yr = dr_10yr.mean(dim=['lat', 'lon']).compute()
In [36]:
ts_10yr.plot(size=6)
Out[36]:
Comments
Comments powered by Disqus